1   /**
2    * Copyright 2014 Netflix, Inc.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package rx.internal.operators;
17  
18  import static org.mockito.Matchers.any;
19  import static org.mockito.Mockito.mock;
20  import static org.mockito.Mockito.times;
21  import static org.mockito.Mockito.verify;
22  
23  import java.util.concurrent.CountDownLatch;
24  
25  import org.junit.Test;
26  import org.mockito.Mockito;
27  
28  import rx.Observable;
29  import rx.Observer;
30  import rx.functions.Func1;
31  import rx.internal.util.RxRingBuffer;
32  import rx.observers.TestSubscriber;
33  
34  public class OperatorFilterTest {
35  
36      @Test
37      public void testFilter() {
38          Observable<String> w = Observable.just("one", "two", "three");
39          Observable<String> observable = w.filter(new Func1<String, Boolean>() {
40  
41              @Override
42              public Boolean call(String t1) {
43                  return t1.equals("two");
44              }
45          });
46  
47          @SuppressWarnings("unchecked")
48          Observer<String> observer = mock(Observer.class);
49          observable.subscribe(observer);
50          verify(observer, Mockito.never()).onNext("one");
51          verify(observer, times(1)).onNext("two");
52          verify(observer, Mockito.never()).onNext("three");
53          verify(observer, Mockito.never()).onError(any(Throwable.class));
54          verify(observer, times(1)).onCompleted();
55      }
56  
57      /**
58       * Make sure we are adjusting subscriber.request() for filtered items
59       */
60      @Test(timeout = 500)
61      public void testWithBackpressure() throws InterruptedException {
62          Observable<String> w = Observable.just("one", "two", "three");
63          Observable<String> o = w.filter(new Func1<String, Boolean>() {
64  
65              @Override
66              public Boolean call(String t1) {
67                  return t1.equals("three");
68              }
69          });
70  
71          final CountDownLatch latch = new CountDownLatch(1);
72          TestSubscriber<String> ts = new TestSubscriber<String>() {
73  
74              @Override
75              public void onCompleted() {
76                  System.out.println("onCompleted");
77                  latch.countDown();
78              }
79  
80              @Override
81              public void onError(Throwable e) {
82                  e.printStackTrace();
83                  latch.countDown();
84              }
85  
86              @Override
87              public void onNext(String t) {
88                  System.out.println("Received: " + t);
89                  // request more each time we receive
90                  request(1);
91              }
92  
93          };
94          // this means it will only request "one" and "two", expecting to receive them before requesting more
95          ts.requestMore(2);
96  
97          o.subscribe(ts);
98  
99          // this will wait forever unless OperatorTake handles the request(n) on filtered items
100         latch.await();
101     }
102 
103     /**
104      * Make sure we are adjusting subscriber.request() for filtered items
105      */
106     @Test(timeout = 500000)
107     public void testWithBackpressure2() throws InterruptedException {
108         Observable<Integer> w = Observable.range(1, RxRingBuffer.SIZE * 2);
109         Observable<Integer> o = w.filter(new Func1<Integer, Boolean>() {
110 
111             @Override
112             public Boolean call(Integer t1) {
113                 return t1 > 100;
114             }
115         });
116 
117         final CountDownLatch latch = new CountDownLatch(1);
118         final TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
119             
120             @Override
121             public void onCompleted() {
122                 System.out.println("onCompleted");
123                 latch.countDown();
124             }
125             
126             @Override
127             public void onError(Throwable e) {
128                 e.printStackTrace();
129                 latch.countDown();
130             }
131             
132             @Override
133             public void onNext(Integer t) {
134                 System.out.println("Received: " + t);
135                 // request more each time we receive
136                 request(1);
137             }
138         };
139         // this means it will only request 1 item and expect to receive more
140         ts.requestMore(1);
141 
142         o.subscribe(ts);
143 
144         // this will wait forever unless OperatorTake handles the request(n) on filtered items
145         latch.await();
146     }
147 }